package com.atguigu.gmall.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.atguigu.gmall.realtime.app.func.DimSink;
import com.atguigu.gmall.realtime.app.func.MyDebeziumDeserializationSchema;
import com.atguigu.gmall.realtime.app.func.TableProcessFunction;
import com.atguigu.gmall.realtime.beans.TableProcess;
import com.atguigu.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

/**
 * Author: Felix
 * Date: 2021/12/26
 * Desc: 业务数据动态分流
 * 需要启动的进程
 *      zk、kafka、maxwell、hdfs、hbase、MySQL、BaseDBApp
 * 执行流程
 *      -业务数据库表发生了变化
 *      -变化会记录到binlog中
 *      -maxwell 从binlog中获取变化数据，并将数据封装为一个json字符串，发送到kafka的ods_base_db_m主题中
 *      -BaseDBApp从ods_base_db_m读取数据
 *          >基本环境准备
 *          >设置检查点
 *          >从Kafka中读取数据
 *          >对读取数据进行转换
 *          >ETL
 *          >使用FlinkCDC从配置表 中读取配置数据，形成流
 *          >对配置流进行广播
 *          >将主流和配置流通过connect进行连接
 *          >对连接之后的流进行处理
 *              &broadcastProcessElement---处理广播流数据
 *                  1.从广播流中读取配置信息  封装为TableProcess对象，放到广播状态中
 *                  3.创建维度表
 *                      拼接维度表建表语句
 *                      使用jdbc方式执行维度建表操作
 *              &processElement---处理主流业务数据
 *                  2.根据表名和类型封装为key，从广播状态中获取当前处理的数据对应的配置信息，根据配置进行分流
 *                  4.字段过滤
 */
public class BaseDBApp {
    public static void main(String[] args) throws Exception {
        //TODO 1. 基本环境准备
        //1.1 流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);
        /*
        //TODO 2.检查点相关设置
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
        env.setStateBackend(new FsStateBackend("xxxx"));
        System.setProperty("HADOOP_USER_NAME","xxxx");
        */

        //TODO 3.从Kafka中读取数据
        //3.1 声明消费的主题以及消费者组
        String topic = "ods_base_db_m";
        String groupId = "base_db_app_group";
        //3.2 创建消费者对象
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        //3.3 消费数据 封装为流
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //TODO 4.对流中的数据类型进行转换   jsonStr->jsonObj
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        //TODO 5.对流中的数据进行简单的ETL
        SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(
            new FilterFunction<JSONObject>() {
                @Override
                public boolean filter(JSONObject jsonObj) throws Exception {
                    boolean flag =
                        jsonObj.getString("table") != null &&
                            jsonObj.getString("table").length() > 0 &&
                            jsonObj.getJSONObject("data") != null &&
                            jsonObj.getString("data").length() > 3;
                    return flag;
                }
            }
        );

        //filterDS.print(">>>>>");

        //TODO 6.使用FlinkCDC读取配置表中的数据  并得到一条配置流
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
            .hostname("hadoop202")
            .port(3306)
            .databaseList("gmall0701_realtime")
            .tableList("gmall0701_realtime.table_process")
            .username("root")
            .password("123456")
            .deserializer(new MyDebeziumDeserializationSchema())
            .startupOptions(StartupOptions.initial())
            .build();
        DataStreamSource<String> mysqlDS = env.addSource(sourceFunction);

        //TODO 7.对配置流进行广播
        MapStateDescriptor<String, TableProcess> mapStateDescriptor
            = new MapStateDescriptor<String, TableProcess>("mapStateDescriptor",String.class,TableProcess.class);
        BroadcastStream<String> broadcastDS = mysqlDS.broadcast(mapStateDescriptor);

        //TODO 8.将业务流和广播配置流连接在一起
        BroadcastConnectedStream<JSONObject, String> connectDS = filterDS.connect(broadcastDS);

        //TODO 9.对连接之后的流进行处理-----动态分流   维度数据---维度侧输出流   事实数据---主流
        OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dimTag"){};

        SingleOutputStreamOperator<JSONObject> realDS = connectDS.process(
           new TableProcessFunction(dimTag,mapStateDescriptor)
        );

        DataStream<JSONObject> dimDS = realDS.getSideOutput(dimTag);
        realDS.print(">>>>");
        dimDS.print("####");

        //TODO 10.将维度侧输出流的数据保存到phoenix表中
        dimDS.addSink(new DimSink());

        //TODO 11.将事实主流的数据保存到kafka的主题中
        realDS.addSink(MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
                String topic = jsonObj.getString("sink_table");
                JSONObject dataJsonObj = jsonObj.getJSONObject("data");
                return new ProducerRecord<byte[], byte[]>(topic,dataJsonObj.toJSONString().getBytes());
            }
        }));

        env.execute();
    }
}
